package com.zk;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Zookeeper分布式锁示例 ?该类应该写成单例?
 *  生产中可用curtor框架，它屏避了一些复杂的操作
 *  以下代码只是展示实现原理，并不健全
 * <p>
 * ## zookeeper特性：
 * 1. 在Zookeeper中，znode是一个跟Unix文件系统路径相似的节点，可以往这个节点存储或获取数据。
 * 2. 通过客户端，可对znode进行增删改查等操作，还可以注册watcher监控znode的变化。
 * 3. zookeeper的同父子节点不可重名 [ 不同的父节点，子节点可重名]
 * <p>
 * ## zookeeper实现分布式锁原理
 * 1. 加锁：去创建指定名称的节点，如果创建成功，则获得锁（加锁成功）。如果节点已存在，就表示锁被别人获取了
 * 2. 释放锁：删除指定名称的节点
 * 3. zookeeper的同父子节点不可重名
 *    参考：https://blog.csdn.net/koflance/article/details/78616206
 * <p>
 * ## zookeeper节点类型
 * 1. 持久节点（persistent：默认）
 * 2. 持久顺序节点（persistent_sequential）
 * 3. 临时节点（ephemeral）：客户端关闭了就会消失
 * 4. 临时顺序节点 （ephemeral_sequential）:  && 实现分布式锁的关键 &&
 * <p>
 * ## 为什么要临时顺序节点实现分布式锁？
 * 1. 临时节点可以避免[ 在获取锁后释放锁前，重启服务时没有释放zookeeper服务里的锁，从而导致死锁的问题 ]
 * 2. 顺序节点可以避免[ 删除节点时，唤醒所有线程，引发所有线程去抢锁，就像所有海鸟去抢你手中的食物一样，从而导致惊群效应的问题 ]
 * 3. 顺序节点可保证一次唤醒一个线程
 * <p>
 * ## 惊群效应导致什么问题？
 * 1. 巨大的服务性能损耗
 * 2. 网络冲击
 * 3. 可能造成空宕机
 *    参考：https://blog.csdn.net/second60/article/details/81252106
 */
public class ZkDistributeLock implements Lock {

    // 父路径：持久节点，相当于basePath
    private String lockPath;

    private ZkClient client;

    /**
     * 用ThreadLocal保证了同一线程中，锁是可重入的，这里没有实现记录重入次数的功能
     */
    private ThreadLocal<String> currentPath = new ThreadLocal<String>();

    private ThreadLocal<String> beforePath = new ThreadLocal<String>();

    public ZkDistributeLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        // 最好是做成单例的
        client = new ZkClient("localhost:2181");
        // 创建父节点
        if (!this.client.exists(lockPath)) {
            this.client.createPersistent(lockPath);
        }
    }

    public void lock() {
        if (!tryLock()) {
            // 阻塞
            this.waitForCall();
            // 唤醒后再次加锁
            this.lock();
        }
    }

    /**
     * 阻塞，监听到删除事件后唤醒
     */
    public void waitForCall() {
        final CountDownLatch cdl = new CountDownLatch(1);
        // 新建监听器
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("由于分布锁的实现只有节点的新增和删除，所以是不会调用到这里的");
            }
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("监听到节点删除：" + s);
                cdl.countDown();
            }
        };
        // 注册监听器，监听前一个节点[ 这样可以实现类似链表的功能，为后面只唤醒一个线程作准备 ]
        client.subscribeDataChanges(beforePath.get(), listener);
        // 前面还有节点，则解除前面节点的监听器（多次循环这个判断，保证了只唤醒最最前面的一个节点：即最小节点）
        if (this.client.exists(beforePath.get())) {
            try {
                // 阻塞
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 解除监听器
        client.unsubscribeDataChanges(beforePath.get(), listener);
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        try {
            if (currentPath == null) {
                // 创建临时节点：防止拿到锁后，释放锁前服务重启导致zookeeper里的锁永远也不会释放掉
                // childLockName是需要参数里传过来的，即锁名称
                currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "childLockName"));
            }
            // 获取所有子节点
            List<String> children = this.client.getChildren(lockPath);
            // 排序
            Collections.sort(children);
            // 判断当前节点是否是最小的
            if (currentPath.equals(lockPath + "/" + children.get(0))) {
                return true;
            } else {
                // 当前不是最小节点时，要获取前一个节点
                int currentIdx = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
                beforePath.set(lockPath + "/" + children.get(currentIdx - 1));
                return false;
            }
        } catch (ZkNodeExistsException e) {
            return false;
        }
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        // 删除节点
        this.client.delete(this.currentPath.get());
    }

    public Condition newCondition() {
        return null;
    }
}
